package com.godenwater.recv.manager;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.serial.SerialAddress;
import org.apache.mina.transport.serial.SerialConnector;
import org.apache.mina.transport.serial.SerialAddress.DataBits;
import org.apache.mina.transport.serial.SerialAddress.FlowControl;
import org.apache.mina.transport.serial.SerialAddress.Parity;
import org.apache.mina.transport.serial.SerialAddress.StopBits;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.godenwater.core.container.BasicModule;
import com.godenwater.recv.server.all.RtuCodecFactory;
import com.godenwater.recv.server.all.RtuConfig;
import com.godenwater.recv.server.all.RtuServer;
import com.godenwater.recv.server.all.ServerDataHandler;

public class ConnectionManager extends BasicModule {

    private static Logger logger = LoggerFactory
            .getLogger(ConnectionManager.class);

    private static int DEFAULT_SERVER_PORT = 20000;
    // 创建一个非阻塞式的服务端，默认打开一个创建此对象
    private NioSocketAcceptor tcpAcceptor;
    private NioSocketAcceptor tcpAcceptor2;
    private NioDatagramAcceptor udpAcceptor;// = new NioDatagramAcceptor();
    private NioDatagramAcceptor udpAcceptor2;

    private IoConnector gsmConnector;// = new SerialConnector();
    private IoConnector pstnConnector;// = new SerialConnector();

    private IoConnector bdConnector;

    public ConnectionManager() {
        super("Connection Manager");
    }

    @Override
    public void start() {
        super.start();
        startTcpListener();
        startTcp2Listener();
        startUdpListener();
        startUdp2Listener();
        startGsmListener();
        startBdListener();
        // startPstnListener();
    }

    @Override
    public void stop() {
        super.stop();
        stopTcpListener();
        stopTcp2Listener();
        stopUdpListener();
        stopUdp2Listener();
        stopGsmListener();
        stopBdListener();
        // stopPstnListener();
    }

    public boolean isTcpListenerEnabled() {
        return true;
    }

    public int getTcpListenerPort() {
        return DEFAULT_SERVER_PORT;// JiveGlobals.getIntProperty("xmpp.server.socket.port",
        // DEFAULT_SERVER_PORT);
    }

    public void setTcpListenerPort(int port) {
        if (port == getTcpListenerPort()) {
            // Ignore new setting
            return;
        }
        // JiveGlobals.setProperty("xmpp.server.socket.port",
        // String.valueOf(port));
        // Stop the port listener for s2s communication
        stopTcpListener();
        if (isTcpListenerEnabled()) {
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // Start the port listener for s2s communication
            startTcpListener();
        }
    }

    private void startTcpListener() {

        if (RtuConfig.isHydroGprsTcpEnabled()) {

            tcpAcceptor = new NioSocketAcceptor();
            int PORT = RtuConfig.getHydroGprsTcpPort();
            System.out.println(">> tcp port " + PORT);
            // The logger, if needed. Commented atm
            DefaultIoFilterChainBuilder chain = tcpAcceptor.getFilterChain();
            // chain.addLast("logger", new LoggingFilter());
            chain.addLast("protocol", new ProtocolCodecFilter(new RtuCodecFactory(
                    true, "tcp")));
            //chain.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // LoggingFilter loggingFilter = new LoggingFilter();
            // chain.addLast("logging", loggingFilter);

            tcpAcceptor.setHandler(new ServerDataHandler(RtuServer.getInstance(), "tcp", PORT));
            // 设置读取数据的缓冲区大小
            tcpAcceptor.getSessionConfig().setReadBufferSize(602); // 针对最大报文的缓冲设置

            // 读写通道30秒内无操作进入空闲状态
            tcpAcceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            //设置50秒超时
            tcpAcceptor.getSessionConfig().setWriteTimeout(20);
            SocketSessionConfig dcfg = tcpAcceptor.getSessionConfig();
            dcfg.setReuseAddress(true);

            try {
                //tcpAcceptor.bind(new InetSocketAddress(PORT));
                String ip = RtuConfig.getHydroGprsTcpIp();
                if (StringUtils.isBlank(ip) || StringUtils.equals(ip, "0")) {
                    tcpAcceptor.bind(new InetSocketAddress(PORT));
                } else {
                    InetAddress inetAddress = InetAddress.getByName(ip);
                    tcpAcceptor.bind(new InetSocketAddress(inetAddress, PORT));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void stopTcpListener() {
        if (tcpAcceptor != null) {
            System.out.println("正在关闭TCP连接");
            tcpAcceptor.unbind();
            tcpAcceptor = null;
        }
    }

    private void startTcp2Listener() {
        if (RtuConfig.isHydroGprsTcp2Enabled()) {
            tcpAcceptor2 = new NioSocketAcceptor();
            int PORT = RtuConfig.getHydroGprsTcpPort2();
            System.out.println(">> tcp2 port " + PORT);
            // The logger, if needed. Commented atm
            DefaultIoFilterChainBuilder chain = tcpAcceptor2.getFilterChain();
            // chain.addLast("logger", new LoggingFilter());
            chain.addLast("protocol", new ProtocolCodecFilter(new RtuCodecFactory(
                    true, "tcp")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));
            //chain.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
            // LoggingFilter loggingFilter = new LoggingFilter();
            // chain.addLast("logging", loggingFilter);

            tcpAcceptor2.setHandler(new ServerDataHandler(RtuServer.getInstance(), "tcp", PORT));
            // 设置读取数据的缓冲区大小
            tcpAcceptor2.getSessionConfig().setReadBufferSize(604); // 针对最大报文的缓冲设置

            // 读写通道30秒内无操作进入空闲状态
            tcpAcceptor2.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            //设置50秒超时
            tcpAcceptor2.getSessionConfig().setWriteTimeout(20);
            SocketSessionConfig dcfg = tcpAcceptor2.getSessionConfig();
            dcfg.setReuseAddress(true);

            try {
                //tcpAcceptor2.bind(new InetSocketAddress(PORT));

                String ip = RtuConfig.getHydroGprsTcpIp();
                if (StringUtils.isBlank(ip) || StringUtils.equals(ip, "0")) {
                    tcpAcceptor2.bind(new InetSocketAddress(PORT));
                } else {
                    InetAddress inetAddress = InetAddress.getByName(ip);
                    tcpAcceptor2.bind(new InetSocketAddress(inetAddress, PORT));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void stopTcp2Listener() {
        if (tcpAcceptor2 != null) {
            System.out.println("正在关闭TCP连接");
            tcpAcceptor2.unbind();
            tcpAcceptor2 = null;
        }
    }
    // ---------------------------------------------

    private void startUdpListener() {

        if (RtuConfig.isHydroGprsUdpEnabled()) {

            int PORT = RtuConfig.getHydroGprsUdpPort();
            System.out.println(">> udp port " + PORT);
            udpAcceptor = new NioDatagramAcceptor();

            // The logger, if needed. Commented atm
            DefaultIoFilterChainBuilder chain = udpAcceptor.getFilterChain();
            chain.addLast("protocol", new ProtocolCodecFilter(new RtuCodecFactory(
                    true, "udp")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // chain.addLast("logging", new LoggingFilter());

            udpAcceptor.setHandler(new ServerDataHandler(RtuServer.getInstance(), "udp", PORT));
            // 设置读取数据的缓冲区大小
            udpAcceptor.getSessionConfig().setReadBufferSize(1024);
            // 读写通道30秒内无操作进入空闲状态
            udpAcceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);

            DatagramSessionConfig dcfg = udpAcceptor.getSessionConfig();
            dcfg.setReuseAddress(true);
            try {
                //InetAddress inetAddress = InetAddress.getByName("172.1.1.2");
                //udpAcceptor.bind(new InetSocketAddress(inetAddress,PORT));
                //udpAcceptor.bind(new InetSocketAddress(PORT));

                String ip = RtuConfig.getHydroGprsUdpIp();
                if (StringUtils.isBlank(ip) || StringUtils.equals(ip, "0")) {
                    udpAcceptor.bind(new InetSocketAddress(PORT));
                } else {
                    InetAddress inetAddress = InetAddress.getByName(ip);
                    udpAcceptor.bind(new InetSocketAddress(inetAddress, PORT));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    private void stopUdpListener() {
        if (udpAcceptor != null) {
            udpAcceptor.unbind();
            udpAcceptor = null;
        }
    }

    private void startUdp2Listener() {

        if (RtuConfig.isHydroGprsUdp2Enabled()) {

            int PORT = RtuConfig.getHydroGprsUdpPort2();
            System.out.println(">> udp port " + PORT);
            udpAcceptor2 = new NioDatagramAcceptor();

            // The logger, if needed. Commented atm
            DefaultIoFilterChainBuilder chain = udpAcceptor2.getFilterChain();
            chain.addLast("protocol", new ProtocolCodecFilter(new RtuCodecFactory(
                    true, "udp")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // chain.addLast("logging", new LoggingFilter());

            udpAcceptor2.setHandler(new ServerDataHandler(RtuServer.getInstance(), "udp", PORT));
            // 设置读取数据的缓冲区大小
            udpAcceptor2.getSessionConfig().setReadBufferSize(1024);
            // 读写通道30秒内无操作进入空闲状态
            udpAcceptor2.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);

            DatagramSessionConfig dcfg = udpAcceptor2.getSessionConfig();
            dcfg.setReuseAddress(true);
            try {
                //udpAcceptor2.bind(new InetSocketAddress(PORT));

                String ip = RtuConfig.getHydroGprsUdpIp();
                if (StringUtils.isBlank(ip) || StringUtils.equals(ip, "0")) {
                    udpAcceptor2.bind(new InetSocketAddress(PORT));
                } else {
                    InetAddress inetAddress = InetAddress.getByName(ip);
                    udpAcceptor2.bind(new InetSocketAddress(inetAddress, PORT));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    private void stopUdp2Listener() {
        if (udpAcceptor2 != null) {
            udpAcceptor2.unbind();
            udpAcceptor2 = null;
        }
    }


    private void startGsmListener() {

        if (RtuConfig.isHydroGsmEnabled()) {
            gsmConnector = new SerialConnector();
            // /dev/ttyS0

            DataBits db = DataBits.DATABITS_8;
            int dataBits = RtuConfig.getHydroGsmByteSize();
            if (dataBits == 5) {
                db = DataBits.DATABITS_5;
            }
            if (dataBits == 6) {
                db = DataBits.DATABITS_6;
            }
            if (dataBits == 7) {
                db = DataBits.DATABITS_7;
            }

            Parity p = Parity.NONE;
            String parity = RtuConfig.getHydroGsmParity();
            if (StringUtils.equalsIgnoreCase(parity, "Even")) {
                p = Parity.EVEN;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Odd")) {
                p = Parity.ODD;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Mark")) {
                p = Parity.MARK;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Space")) {
                p = Parity.SPACE;
            }

            StopBits sb = StopBits.BITS_1;
            String stopBits = RtuConfig.getHydroGsmStopbits();
            if (StringUtils.equalsIgnoreCase(stopBits, "1.5")) {
                sb = StopBits.BITS_1_5;
            }
            if (StringUtils.equalsIgnoreCase(parity, "2")) {
                sb = StopBits.BITS_2;
            }

            SerialAddress portAddress = new SerialAddress(
                    RtuConfig.getHydroGsmSerial(),
                    RtuConfig.getHydroGsmBaudRate(), db, sb, p,
                    FlowControl.NONE);

            // ==========================================

            DefaultIoFilterChainBuilder chain = gsmConnector.getFilterChain();
            // chain.addLast("logger", new LoggingFilter());
            chain.addLast("protocol", new ProtocolCodecFilter(
                    new RtuCodecFactory(true, "gsm")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // LoggingFilter loggingFilter = new LoggingFilter();
            // chain.addLast("logging", loggingFilter);

            gsmConnector.setHandler(new ServerDataHandler(RtuServer.getInstance(), "COMM", 15210));
            // 设置读取数据的缓冲区大小
            gsmConnector.getSessionConfig().setReadBufferSize(
                    RtuConfig.getBufferSize()); // 针对最大报文的缓冲设置

            // 读写通道30秒内无操作进入空闲状态
            gsmConnector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
                    40);

            // ===========================================

            try {
                ConnectFuture future = gsmConnector.connect(portAddress);

                future.await();

                // IoSession sessin = future.getSession();

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void stopGsmListener() {
        if (gsmConnector != null) {
            gsmConnector.dispose();
            gsmConnector = null;
        }
    }

    private void startPstnListener() {
        if (RtuConfig.isHydroPstnEnabled()) {
            pstnConnector = new SerialConnector();
            // /dev/ttyS0
            SerialAddress portAddress = new SerialAddress(
                    RtuConfig.getHydroPstnSerial(),
                    RtuConfig.getHydroPstnBaudRate(), DataBits.DATABITS_8,
                    StopBits.BITS_1, Parity.NONE, FlowControl.NONE);

            // ==========================================

            DefaultIoFilterChainBuilder chain = pstnConnector.getFilterChain();
            // chain.addLast("logger", new LoggingFilter());
            chain.addLast("protocol", new ProtocolCodecFilter(
                    new RtuCodecFactory(true, "pstn")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // LoggingFilter loggingFilter = new LoggingFilter();
            // chain.addLast("logging", loggingFilter);

            pstnConnector.setHandler(new ServerDataHandler(RtuServer.getInstance(), "PSTN", 15210));
            // 设置读取数据的缓冲区大小
            pstnConnector.getSessionConfig().setReadBufferSize(
                    RtuConfig.getBufferSize()); // 针对最大报文的缓冲设置

            // 读写通道30秒内无操作进入空闲状态
            pstnConnector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
                    40);

            // ===========================================

            try {
                ConnectFuture future = pstnConnector.connect(portAddress);

                future.await();

                // IoSession sessin = future.getSession();

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void stopPstnListener() {
        if (pstnConnector != null) {
            pstnConnector.dispose();
            pstnConnector = null;
        }
    }

    // 开启北斗连接
    private void startBdListener() {
        if (RtuConfig.isHydroBdEnabled()) {
            bdConnector = new SerialConnector();
            // /dev/ttyS0

            //
            DataBits db = DataBits.DATABITS_8;
            int dataBits = RtuConfig.getHydroBdByteSize();
            if (dataBits == 5) {
                db = DataBits.DATABITS_5;
            }
            if (dataBits == 6) {
                db = DataBits.DATABITS_6;
            }
            if (dataBits == 7) {
                db = DataBits.DATABITS_7;
            }

            Parity p = Parity.NONE;
            String parity = RtuConfig.getHydroBdParity();
            if (StringUtils.equalsIgnoreCase(parity, "Even")) {
                p = Parity.EVEN;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Odd")) {
                p = Parity.ODD;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Mark")) {
                p = Parity.MARK;
            }
            if (StringUtils.equalsIgnoreCase(parity, "Space")) {
                p = Parity.SPACE;
            }

            StopBits sb = StopBits.BITS_1;
            String stopBits = RtuConfig.getHydroBdStopbits();
            if (StringUtils.equalsIgnoreCase(stopBits, "1.5")) {
                sb = StopBits.BITS_1_5;
            }
            if (StringUtils.equalsIgnoreCase(parity, "2")) {
                sb = StopBits.BITS_2;
            }

            SerialAddress portAddress = new SerialAddress(
                    RtuConfig.getHydroBdSerial(),
                    RtuConfig.getHydroBdBaudRate(), db, sb, p, FlowControl.NONE);

            // ==========================================

            DefaultIoFilterChainBuilder chain = bdConnector.getFilterChain();
            // chain.addLast("logger", new LoggingFilter());
            chain.addLast("protocol", new ProtocolCodecFilter(
                    new RtuCodecFactory(true, "bd")));

            // 增加一个数据处理缓存线程池
            // chain.addLast("dbThreadpool", new
            // ExecutorFilter(Executors.newCachedThreadPool()));

            // LoggingFilter loggingFilter = new LoggingFilter();
            // chain.addLast("logging", loggingFilter);

            bdConnector.setHandler(new ServerDataHandler(RtuServer.getInstance(), "BD", 15210));
            // 设置读取数据的缓冲区大小
            bdConnector.getSessionConfig().setReadBufferSize(
                    RtuConfig.getBufferSize()); // 针对最大报文的缓冲设置

            // 读写通道30秒内无操作进入空闲状态
            bdConnector.getSessionConfig()
                    .setIdleTime(IdleStatus.BOTH_IDLE, 40);

            // ===========================================

            try {
                ConnectFuture future = bdConnector.connect(portAddress);

                future.await();

                // IoSession sessin = future.getSession();

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    private void stopBdListener() {
        if (bdConnector != null) {
            bdConnector.dispose();
            bdConnector = null;
        }
    }

    public static void main(String[] args) {
        SerialConnector connector = new SerialConnector();

        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        chain.addLast("protocol", new ProtocolCodecFilter(new RtuCodecFactory(
                true, "test")));
        LoggingFilter loggingFilter = new LoggingFilter();
        connector.getFilterChain().addLast("logging", loggingFilter);

    }

}
